Skip to content

Feature/pass airflow confi as job param#39007

Closed
SubhamSinghal wants to merge 4 commits intoapache:mainfrom
SubhamSinghal:feature/pass-airflow-confi-as-job-param
Closed

Feature/pass airflow confi as job param#39007
SubhamSinghal wants to merge 4 commits intoapache:mainfrom
SubhamSinghal:feature/pass-airflow-confi-as-job-param

Conversation

@SubhamSinghal
Copy link
Copy Markdown
Contributor

@SubhamSinghal SubhamSinghal commented Apr 14, 2024

Fixes: #39002

Tests:

  1. empty airflow config and empty job parameter -- empty job parameter
  2. empty airflow config and some job parameter in databricks operator -- job params from operator get applied
  3. non empty airflow config with empty job parameter -- airflow config gets passed as job param
  4. non empty airflow config with non empty job parameter -- job params from operator get applied

Comment thread airflow/providers/databricks/operators/databricks.py Outdated
Comment thread airflow/providers/databricks/operators/databricks.py Outdated
@SubhamSinghal
Copy link
Copy Markdown
Contributor Author

@dirrao Let me know if it makes more sense to add a flag "import_airflow_config" and when it is True then set airflow config as job_params in databricks jobs.

@eladkal eladkal requested review from Lee-W and pankajkoti April 16, 2024 08:00
Comment on lines +317 to +321
job_params = self.params.items() if self.params.items() else {}
param_list = []
for k, v in job_params:
param_list.append({"name": k, "default": v})
self.json["parameters"] = param_list
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
job_params = self.params.items() if self.params.items() else {}
param_list = []
for k, v in job_params:
param_list.append({"name": k, "default": v})
self.json["parameters"] = param_list
if self.params.items() is not None:
self.json["parameters"] = [{"name": k, "default": v} for for k, v in self.params.items()]
else:
self.json["parameters"] = {}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this self.params from?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are airflow configs

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Jul 8, 2024

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions Bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Jul 8, 2024
@github-actions github-actions Bot closed this Jul 14, 2024
moomindani added a commit to moomindani/airflow that referenced this pull request May 9, 2026
Apply Lee-W's review suggestion from PR apache#39007: replace the manual loop
with a list comprehension that uses ``params.dump()`` (the original
``params.items()`` iteration yielded ``Param`` objects rather than the
resolved values, which would not serialise into the Databricks API).

Extend the same pattern to:
* DatabricksRunNowOperator -> populate top-level ``job_parameters`` (the
  dict-shaped slot already supported by the run-now endpoint).
* DatabricksSubmitRunOperator -> populate dict-shaped per-task parameter
  fields (notebook_task.base_parameters, python_wheel_task.named_parameters,
  sql_task.parameters, run_job_task.job_parameters). Tasks whose only
  parameter field is ``List[str]`` (spark_jar_task, spark_python_task,
  spark_submit_task) are intentionally skipped because there is no
  canonical mapping from a key/value dict to positional CLI arguments.

Drop the ``"parameters": []`` expectation that was added to the existing
test_exec_create / test_exec_reset cases by PR apache#39007 — it never matched
the source logic (``self.params`` is falsy when no params are set, so no
``parameters`` key is added).

Add tests covering: auto-injection for each operator, no override when
the field is already populated, and the per-task injection rules for
SubmitRun.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:databricks stale Stale PRs per the .github/workflows/stale.yml policy file

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Allow passing airflow params as job parameter in databricks job

4 participants